I already wrote about query performance analysis in lakehouses, but one secret was left behind: The capability to track the queries back to a semantic model, report and visual in a report, identifying the query lineage.
If you check the text of the queries, at the end of the text you will find content like this:
This has an interesting meaning:
- We can use this information to track the query lineage
- Applications can send lineage (or more) to SQL using OPTION (LABEL) statement
Applications Tracking Lineage
If you develop applications consuming data from a lakehouse SQL Endpoint, they can track the query lineage using the OPTION(LABEL) in the SQL Statement.
This OPTION is not only accepted by the lakehouse, but the lakehouse will process this option, extract the JSON in this option and include it in the field “label” in the “queryinsights” view.
The JSON doesn’t need to have this exact format: it can contain anything your application need and when using custom applications, you can expand the tracking way beyond the query lineage.
Retrieving The Lineage from QueryInsights
The lineage information received with the query is stored in a field called “label”, in JSON format.
We need to use JSON expressions to extract the 4 values from the field: DatasetId, ReportId, VisualId and Operation. Once we confirm the label field contains a JSON value, we can extract the value from the JSON.
This is an example of the DatasetId value:
CASE
WHEN ISJSON(label) = 1 THEN JSON_VALUE(label, ‘$.DatasetId’)
ELSE NULL
END AS DatasetId
The other ones are slightly different because they are in a different position of the JSON:
CASE
WHEN ISJSON(label) = 1 THEN JSON_VALUE(label, ‘$.Sources[0].ReportId’)
ELSE NULL
END AS ReportId
The complete query and execution
Let’s consider the query below to retrieve this information:
SELECT TOP 20 distributed_statement_id,
program_name,
data_scanned_disk_mb,
data_scanned_memory_mb,
data_scanned_remote_storage_mb,
Replace(Replace(command, Char(13), ”), Char(10), ”) AS command,
total_elapsed_time_ms,
start_time,
end_time,
allocated_cpu_time_ms,
status,
row_count,
CASE
WHEN Isjson(label) = 1 THEN Json_value(label, ‘$.DatasetId’)
ELSE NULL
END AS DatasetId,
CASE
WHEN Isjson(label) = 1 THEN
Json_value(label, ‘$.Sources[0].ReportId’)
ELSE NULL
END AS ReportId,
CASE
WHEN Isjson(label) = 1 THEN
Json_value(label, ‘$.Sources[0].VisualId’)
ELSE NULL
END AS VisualId,,
CASE
WHEN Isjson(label) = 1 THEN
Json_value(label, ‘$.Sources[0].Operation’)
ELSE NULL
END AS Operation,
label
FROM queryinsights.exec_requests_history
WHERE program_name IN (
‘Core .Net SqlClient Data Provider’,
‘.Net SqlClient Data Provider’,
‘Framework Microsoft SqlClient Data Provider’,
‘PowerBIPremium-DirectQuery’ )
AND start_time > ‘2024-12-02’
AND command NOT LIKE ‘%sys.sp_set_session_context%’
AND status = ‘Succeeded’
ORDER BY total_elapsed_time_ms DESC
Our focus is the lineage, but the query above has some interesting details about analysing the performance of a lakehouse:
- It filters only the queries with some specific program names. These are program names from client applications, including power bi. You can decide to filter for a specific set of them.
- It filters out queries containing ‘sys.sp_set_session_context’ . It’s a query generated by client libraries when starting a session for a user.
- The query only retrieves “Succeeded” results. You can decide about retrieving the opposite for a different analysis or not filtering by status at all.
- The query is using a TOP 20 ordering by total_elapsed_time_ms in descending order. This results in a list of the worst queries according to the total elapsed time. You can create different queries using the same method with other metrics available in these tables.
- It’s filtering by start_time. In this way, it limits the queries and avoids retrieving old ones.
- It’s removing line breaks from the query text.
This query needs to be executed at an SQL endpoint. One of the possible methods to automate it is to use notebooks. I wrote about how to run SQL Endpoint queries in notebooks.
Translating the Object Names
We can use sempy library to translate the name of semantic models and reports. There are different methods to achieve this, but with some limitations
This kind of translation is always based on the workspace. This means we can’t only ask to translate an Id, we need to know in which workspace the object is.
If we try to list all possible workspaces, we will get as a result only the workspaces we have access to. There is no single API call to translate the object id wherever the object is.
The sempy library has a method called resolve_item_name . One of the parameters is the workspace id. In this way, this method can only be used if you know where to look.
The problem of the resolve_item_name is the need to execute it once for each id to be resolved. We may have too many Ids to resolve, resulting in too many API calls.
A better option will be to use the method list_items to retrieve all the items from a workspace. We can transform the list into a dataframe and translate the names in the query result. list_items can receive a collection of workspace ids, allowing the search to be in multiple workspaces.
According to your needs, you may have a list of workspaces where you should search, or you may need to search in all the available ones. You can create a fixed list with some workspace names or use the method list_workspaces to retrieve a listof all workspaces you have permission to.
PySpark Code Example
The code below is one example, among many possibilities, to translate the Ids inside a dataframe called query_results
import sempy.fabric as fabric
import pandas as pd
# Authenticate and get workspaces
workspaces = fabric.list_workspaces().query('`Is On Dedicated Capacity` == True').Id
# Get both datasets and reports in a single call
datasetItemsList = [fabric.list_items("SemanticModel",wsId) for wsId in workspaces]
datasetItems = pd.concat(datasetItemsList, ignore_index=True)
reportItemsList = [fabric.list_items("Report",wsId) for wsId in workspaces]
reportItems=pd.concat(reportItemsList, ignore_index=True)
dataset_mapping={row["Id"]: row["Display Name"] for x,row in datasetItems.iterrows()}
report_mapping = {row["Id"]: row["Display Name"] for x , row in reportItems.iterrows()}
# Ensure query_results is treated as a list of dictionaries
rows = query_results.select('*').collect()
# Convert Row objects to dictionaries
query_results_list = [row.asDict() for row in rows]
# Translate DatasetId and ReportId to their corresponding names
for row in query_results_list:
if row["DatasetId"] in dataset_mapping:
row["DatasetName"] = dataset_mapping[row["DatasetId"]] # Replace ID with Name
if row["ReportId"] in report_mapping:
row["ReportName"] = report_mapping[row["ReportId"]] # Replace ID with Name
# Convert back to Fabric DataFrame
query_results = spark.createDataFrame(query_results_list)
- The list of workspaces is generated, and some shortcuts are used to convert it to a list of Ids to be used in list_items
- We list semantic models and reports. The list_items doesn’t filter by multiple types, or we list all types, or we execute multiple times. If we decide to list all types, Sandeep Pawar described how to list everything in a single line of code
- Two dictionaries are built to be used as shortcut when translating the Ids
- The dataframe is converted to dictionary
- All the translation happens in the dictionaries
- The result is converted to dataframe
Conclusion
Using this information it’s possible to create a report to analyse the performance of a lakehouse, highlighting the most expensive queries from each semantic model, the most expensive semantic models and identifying where the optimization focus needs to be.
Load comments